Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleMinio Python
#!/usr/bin/python3
from keycloak import KeycloakOpenID
import requests
import xmltodict
from minio import Minio
from minio.error import (ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists)
import uuid
import io

def get_token():
   # Configure client
   keycloak_openid = KeycloakOpenID(server_url="http://keycloak:8080",
                                    client_id="minio-cli",
                                    realm_name="minio",
                                    client_secret_key="YVZGoUdprHmqexlX30stzGWwZtT2SNll")
   return keycloak_openid.token(grant_type='client_credentials')

def get_credentials():
   token = get_token()
   r = requests.post("http://minio:9000",
                     data={
                         'Action': "AssumeRoleWithWebIdentity",
                         'Version': "2011-06-15",
                         'WebIdentityToken': token['id_token'],
                         'DurationSeconds': token['expires_in']
                     })
   tree = xmltodict.parse(r.content)
   return dict(tree['AssumeRoleWithWebIdentityResponse']['AssumeRoleWithWebIdentityResult']['Credentials'])

# Retrieve credenstials
credenstials = get_credentials()

# Initialize Minio client
client = Minio(
        "minio:9000",
        access_key=credenstials['AccessKeyId'],
        secret_key=credenstials['SecretAccessKey'],
        session_token=credenstials['SessionToken'],
        secure=False,
        region=None,
        http_client=None,
        credentials=None
)

# Create bucket if it doesn't already exist
print("Create Bucket")
print("-------------")
bucket_name = "py-bucket"
found = client.bucket_exists(bucket_name)
if not found:
    client.make_bucket(bucket_name)
    print(f"Bucket '{bucket_name}' created")
else:
    print(f"Bucket '{bucket_name}' already exists")

buckets = client.list_buckets()

# Retrieve list of buckets
print("\nBucket List")
print("-----------")
for bucket in buckets:
    print(f"{bucket.name} created on {bucket.creation_date}")

# Upload an object to a bucket
data = "I want to stream some data to minio"
object_bytes = data.encode('utf-8')
object_stream = io.BytesIO(object_bytes)
object_name =  f"{uuid.uuid1()}.txt"

print("\nUpload To Bucket")
print("----------------")
try:
    client.put_object(bucket_name, object_name, object_stream , len(object_bytes))
    print(f"{object_name} uploaded to {bucket_name}")
except Exception as ex:
    raise ex

...